Skip to content

feat(workflows): honor max_concurrency in fan-out via a bounded thread pool#3224

Merged
mnriem merged 9 commits into
github:mainfrom
doquanghuy:feat/3222-fanout-max-concurrency
Jun 30, 2026
Merged

feat(workflows): honor max_concurrency in fan-out via a bounded thread pool#3224
mnriem merged 9 commits into
github:mainfrom
doquanghuy:feat/3222-fanout-max-concurrency

Conversation

@doquanghuy

@doquanghuy doquanghuy commented Jun 29, 2026

Copy link
Copy Markdown
Contributor

Description

Closes #3222.

The fan-out step has carried a max_concurrency field since the workflow engine landed (#2158), but the engine ignored it: _execute_steps ran fan-out items in a sequential for loop and max_concurrency was only recorded in the step output. This honors it.

A new WorkflowEngine._run_fan_out runs items on a bounded ThreadPoolExecutor when max_concurrency > 1, and takes the existing sequential path when <= 1 (the default) — so existing workflows are byte-for-byte unchanged. Results are always assembled in item order (a preallocated slot per item, collected in submission order), never completion order, so fan-in — which reads them positionally — is unaffected. The parentId:templateId:index id grammar and halt-on-first-failure are preserved; max_concurrency is coerced with int(): a value that cannot be coerced (None, a non-numeric string) or that coerces to <= 1 runs sequentially, while a numeric string like "4" or a float like 4.0 is honored.

Fan-out items are I/O-bound — each typically dispatches a command step that spawns a blocking agent-CLI subprocess, which releases the GIL — so a thread pool yields real wall-clock parallelism.

Two concurrency care points:

  • Per-item context isolation — each item runs against its own dataclasses.replace(context, item=…), so context.item is never clobbered across threads; the shared steps dict is written only on the disjoint parent:template:index key.
  • State persistenceRunState.save() previously serialized the live step_results dict via a plain open("w"), so a concurrent fan-out could both interleave on-disk writes and mutate the dict mid-json.dump (dictionary changed size during iteration). save() is now held under a per-run lock and written atomically (temp file + os.replace), and per-item result recording goes through a small record_step_result helper under that lock. Sequential runs see only an uncontended lock.

A genuine exception escaping an item (as opposed to a normal step FAILED, which sets the run status) cancels outstanding work and re-raises, so the run is marked failed rather than reporting a vacuous completion.

Testing

  • Ran the workflow suite with .venv/bin/python -m pytest tests/test_workflows.py — 325 passed, including 15 new TestFanOutConcurrency cases: K≤1 sequential parity, item-order under forced reverse completion (event chain, no sleeps), real parallelism, max_concurrency coercion (0 / negative / None / non-int / string), per-thread item isolation, halt-on-failure prefix, and first-exception cancel + re-raise.
  • Ran the full suite — green except 3 pre-existing failures in unrelated branch-name tests (test_timestamp_branches / git extension) that fail identically on main.
  • uvx ruff check src/ tests/test_workflows.py — clean
  • Tested locally with uv run specify --help
  • Tested with a sample project (covered by the unit tests above)

AI Disclosure

  • I did not use AI assistance for this contribution
  • I did use AI assistance (describe below)

Code, tests, and this description were authored with AI assistance (Claude Code), from a fan-out concurrency investigation; everything was verified by running the repo's test suite and ruff locally.

@mnriem — would appreciate your review when you have a moment. Happy to swap the save() atomicity for a narrower lock if you'd prefer a smaller change.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Implements actual parallel execution for workflow fan-out by honoring max_concurrency, while making RunState persistence safe under concurrent execution (locking + atomic JSON writes). This fits into the workflow engine’s execution model by enabling opt-in bounded parallelism for I/O-bound fan-out items without changing default sequential behavior.

Changes:

  • Add WorkflowEngine._run_fan_out() to execute fan-out items sequentially or via a bounded ThreadPoolExecutor depending on max_concurrency.
  • Make RunState.save() concurrency-safe via a per-run lock and atomic temp-file writes; route step result recording through a locked helper.
  • Add workflow tests covering fan-out concurrency behavior, ordering, coercion, and error/exception handling.
Show a summary per file
File Description
src/specify_cli/workflows/engine.py Adds bounded concurrent fan-out execution and hardens run-state persistence for concurrency.
tests/test_workflows.py Adds a new test suite validating fan-out concurrency semantics and edge cases.

Review details

Tip

Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

  • Files reviewed: 2/2 changed files
  • Comments generated: 5
  • Review effort level: Low

Comment thread src/specify_cli/workflows/engine.py Outdated
Comment thread src/specify_cli/workflows/engine.py Outdated
Comment thread src/specify_cli/workflows/engine.py Outdated
Comment thread tests/test_workflows.py Outdated
Comment thread src/specify_cli/workflows/engine.py
@doquanghuy

doquanghuy commented Jun 29, 2026

Copy link
Copy Markdown
Contributor Author

Thanks @copilot — all five points addressed at root cause (latest: d4479ed):

  1. Run-ahead after halt: replaced the submit-all-up-front loop with a sliding submission window (≤ max_workers in flight) that stops launching new items once the run is halting.
  2. Prefix could drop the halting item: halt is now attributed per item from each item's own recorded result (replaying the sequential break condition, honoring continue_on_error/aborted) rather than the shared run status a later concurrent item may have flipped — so the returned prefix includes the actual halting item, matching the sequential path. (An item that fails before recording, e.g. an unknown step type, is attributed too, since every item runs the same template.)
  3. Unlocked output mutation: the parent fan-out output update now routes through a new RunState.set_step_output() under the run lock, so it can't race a concurrent save().
  4. Docstring vs int() coercion: the docstring now accurately describes the behavior — numeric strings/floats are honored; only non-coercible or <= 1 runs sequentially.
  5. Flaky timing test: switched to a monotonic clock with a looser threshold while keeping a clear gap vs the serialized baseline.

Coverage added: concurrent halt-includes-halting-item, continue_on_error-does-not-truncate, and unknown-template-type-matches-sequential. Full workflows suite green (328 passed), ruff check clean, 30× stress on the concurrent path.

@mnriem could you give this a review when you get a chance? 🙏

…ut, faithful halt

Address the reviewer feedback on the bounded fan-out concurrency:

- Sliding submission window: keep at most `workers` items in flight and stop
  launching new items once the run is halting, instead of submitting all items
  up front (which let the pool keep starting queued work after a halt).
- Faithful halt prefix: attribute a halt to the specific item whose own
  recorded result halted the run (replaying the sequential break condition,
  honoring continue_on_error/aborted), not the shared run status a later
  concurrent item may have flipped. The returned prefix now includes the actual
  halting item, matching the sequential path. An item that fails before
  recording a result (e.g. an unknown step type) is attributed too, since every
  item runs the same template.
- Lock the parent fan-out output mutation: route the post-fan-out
  step_results[...]['output'] update through a new RunState.set_step_output()
  under the run lock, so it cannot race a concurrent save().
- Docstring: describe int() coercion accurately (numeric strings / floats are
  honored; only non-coercible or <= 1 runs sequentially).

Tests: add concurrent halt-includes-halting-item, continue_on_error-does-not-
truncate, and unknown-template-type-matches-sequential coverage; make the
timing test use a monotonic clock with a looser threshold to avoid CI flakiness.
@doquanghuy doquanghuy force-pushed the feat/3222-fanout-max-concurrency branch from 38c7798 to d4479ed Compare June 29, 2026 15:52
@mnriem mnriem requested a review from Copilot June 29, 2026 16:50

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review details

  • Files reviewed: 2/2 changed files
  • Comments generated: 5
  • Review effort level: Low

Comment thread src/specify_cli/workflows/engine.py
Comment thread src/specify_cli/workflows/engine.py Outdated
Comment thread src/specify_cli/workflows/engine.py
Comment thread tests/test_workflows.py
Comment thread src/specify_cli/workflows/engine.py
- append_log: serialize the log_entries append + log.jsonl write under a
  dedicated RunState._log_lock so concurrent fan-out workers can't interleave
  or corrupt log lines (kept separate from the state lock; never nested).
- _run_fan_out.run_item: read the item output back through the item_ctx it
  executed against rather than the outer context closure — clearer and robust
  if StepContext ever stops sharing the steps dict by reference.
- StepBase: document the thread-safety contract — STEP_REGISTRY holds one shared
  instance per type, so concurrent fan-out invokes execute() on the same object;
  implementations must be stateless/thread-safe (the built-ins already are).
- test_concurrency_is_real: prove parallelism deterministically with a
  threading.Barrier (sequential execution can't clear it) instead of a
  wall-clock timing assertion.
@doquanghuy

Copy link
Copy Markdown
Contributor Author

Thanks @copilot — second-pass comments addressed at root cause in ce352a3:

  1. append_log race (engine.py:388): append_log now serializes its log_entries append + log.jsonl write under a dedicated RunState._log_lock, so concurrent fan-out workers can't interleave/corrupt log lines. It's a separate lock from the state _lock (logging shouldn't contend with save()), and since append_log is never called while _lock is held, the two never nest.
  2. run_item returns via outer closure (engine.py:1004): now reads back through the item_ctx it executed against (item_ctx.steps) instead of the outer context.steps — same dict today, but clearer and robust if StepContext copying ever stops sharing by reference.
  3. PR description vs int() coercion (engine.py:990): honoring numeric strings/floats is the intended contract (locked by test_string_max_concurrency_is_honored), so I updated the PR description to describe the real int() behavior (the docstring already does).
  4. Flaky timing test (tests:2075): test_concurrency_is_real now proves parallelism deterministically with a threading.Barrier(n) — sequential execution can't clear it (times out → BrokenBarrierError), so there's no wall-clock threshold to tune or flake.
  5. Singleton step instances (engine.py:1054): documented the StepBase thread-safety contractSTEP_REGISTRY holds one shared instance per type, so concurrent fan-out invokes execute() on the same object; implementations must be stateless/thread-safe (the built-ins already are). I chose documenting the contract over instantiating a fresh step per execution to avoid a broader behavioral change to step construction — happy to switch to fresh-instance-per-exec instead if you'd prefer that, @mnriem.

Full workflows suite green (328 passed), ruff check clean, 30× stress on the concurrency tests.

@mnriem would appreciate your review when you have a chance 🙏

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review details

  • Files reviewed: 3/3 changed files
  • Comments generated: 2
  • Review effort level: Low

Comment thread src/specify_cli/workflows/engine.py Outdated
Comment thread src/specify_cli/workflows/engine.py Outdated
…y cancel semantics

- RunState.save(): move the updated_at timestamp assignment inside the run lock
  so the timestamp matches the snapshot the thread serializes and concurrent
  savers don't race on it.
- _run_fan_out docstring: clarify that on a halt only not-yet-started items are
  cancelled; items already running finish but their outputs are ignored
  (Future.cancel() can't stop running work, and the pool joins on exit).
@doquanghuy

Copy link
Copy Markdown
Contributor Author

Thanks @copilot — both addressed at root cause in 6f03222:

  1. save() stamps updated_at outside the lock (engine.py:437): moved the updated_at = … assignment inside the with self._lock: block, so the timestamp matches the snapshot the thread actually serializes and concurrent savers no longer race on it.
  2. Cancel-semantics overstatement (engine.py:986): reworded the docstring — on a halt, only items that haven't started are cancelled; items already running are allowed to finish but their outputs are ignored (Future.cancel() can't stop running work, and the ThreadPoolExecutor joins on exit).

Full workflows suite green (328 passed), ruff check clean.

@mnriem thanks again for merging #3225 — would appreciate your review on this one too when you have a moment 🙏

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review details

  • Files reviewed: 3/3 changed files
  • Comments generated: 1
  • Review effort level: Low

Comment thread src/specify_cli/workflows/engine.py
The concurrent fan-out path invokes _execute_steps from worker threads, which
calls the engine's on_step_start callback (the CLI sets it to a console.print
lambda). Concurrent invocation could interleave/garble progress output. Guard
the call with a WorkflowEngine._callback_lock so callbacks are serialized;
the lock is uncontended for sequential runs.
@doquanghuy

doquanghuy commented Jun 29, 2026

Copy link
Copy Markdown
Contributor Author

Thanks @copilot — addressed at root cause in f78db82:

Concurrent on_step_start (engine.py:1034): the callback (a console.print lambda from the CLI) was invoked from worker threads during concurrent fan-out and could interleave/garble output. It's now serialized under a new WorkflowEngine._callback_lock for the duration of the call, so concurrent workers can't interleave it (uncontended for sequential runs). This also protects any other non-thread-safe on_step_start implementation.

Full workflows suite green (328 passed), ruff check clean.

@mnriem would appreciate your review when you have a chance 🙏

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review details

  • Files reviewed: 3/3 changed files
  • Comments generated: 1
  • Review effort level: Low

Comment thread src/specify_cli/workflows/engine.py Outdated
…eback

In _run_fan_out's concurrent path, a worker exception was stashed in first_exc
and re-raised after the loop. Re-raise it from within the except block with a
bare `raise` (after cancelling outstanding futures) so the original traceback is
preserved, and drop the now-unneeded first_exc variable. The ThreadPoolExecutor
__exit__ still joins any already-running workers before the exception escapes.
@doquanghuy

Copy link
Copy Markdown
Contributor Author

Thanks @copilot — addressed at root cause in 73ced6a:

Worker exception traceback (engine.py:1104): the concurrent fan-out no longer stashes the exception in first_exc to re-raise after the loop. It now re-raises in place with a bare raise (after cancelling outstanding futures), preserving the original traceback, and the first_exc variable is gone. The ThreadPoolExecutor __exit__ still joins any already-running workers before the exception escapes.

Full workflows suite green (328 passed), ruff check clean. test_first_exception_cancels_and_reraises still verifies the cancel + re-raise path.

@mnriem would appreciate your review when you have a moment 🙏

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review details

  • Files reviewed: 3/3 changed files
  • Comments generated: 3
  • Review effort level: Low

Comment thread src/specify_cli/workflows/engine.py
Comment thread src/specify_cli/workflows/engine.py Outdated
Comment thread src/specify_cli/workflows/engine.py
…te, bound workers

Address third review pass:

- Remove the unlocked `context.steps[step_id]["output"] = …` writes in the
  fan-out parent update. context.steps[step_id] is the same dict object that
  set_step_output() updates under the run lock, so the direct (unsynchronized)
  mutation was redundant.
- Preserve sequential halt semantics under concurrency: a later in-flight item
  could overwrite state.status after the halting item was identified. _run_fan_out
  now derives the halting item's run status (item_halt_status, replacing the bool
  item_halted) and restores it after the pool joins, so the final status is the
  first halting item's outcome.
- Bound the pool: workers = min(max_concurrency, len(items)) and early-return for
  empty items, so a user-controlled max_concurrency can't over-allocate threads.

Add coverage that an earlier PAUSED item's status wins over a later concurrent
FAILED item.
@doquanghuy

Copy link
Copy Markdown
Contributor Author

Thanks @copilot — all three addressed at root cause in 56caa29:

  1. Redundant unlocked output write (engine.py:958): removed the context.steps[step_id]["output"] = … writes. context.steps[step_id] is the same dict object that set_step_output() updates under the run lock, so the direct unsynchronized mutation was redundant — the lock now covers it on the shared object.
  2. Status clobbered by a later in-flight item (engine.py:1116): _run_fan_out now derives the halting item's run status per item (item_halt_status, replacing the bool item_halted) and restores state.status to it after the pool joins, so a later concurrent item (e.g. FAILED) can't overwrite an earlier halting item's status (e.g. PAUSED). Added a test that an earlier PAUSED item wins over a later FAILED one.
  3. Unbounded pool (engine.py:1039): workers = min(max_concurrency, len(items)) plus an early return for empty items, so a user-controlled max_concurrency can't over-allocate threads.

Full workflows suite green (329 passed), ruff check clean, 30× stress on the concurrency tests.

@mnriem would appreciate your review when you have a moment 🙏

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review details

  • Files reviewed: 3/3 changed files
  • Comments generated: 2
  • Review effort level: Low

Comment thread src/specify_cli/workflows/engine.py Outdated
Comment thread src/specify_cli/workflows/engine.py
@mnriem

mnriem commented Jun 30, 2026

Copy link
Copy Markdown
Collaborator

Please address Copilot feedback. If not applicable, please explain why. And please resolve conflicts

…step_results

On a resume run, StepContext is built with steps=state.step_results, so the two
direct `context.steps[...] = ...` writes mutated the shared dict outside the run
lock and could race save(). Route both through a new _record_result helper that
mirrors into context.steps only when it is a distinct object (a fresh run) and
otherwise relies solely on record_step_result's locked write.
…x-concurrency

# Conflicts:
#	tests/test_workflows.py
@doquanghuy

Copy link
Copy Markdown
Contributor Author

@mnriem done on both counts:

Conflicts resolved — merged latest upstream/main into the branch (merge 3462666). The only real conflict was in tests/test_workflows.py, where this PR's TestFanOutConcurrency and the now-merged #3225 TestFanInWaitForValidation were both added at the same spot; kept both. PR is now mergeable.

Copilot feedback addressed (b385796):

  • engine.py:808 & :938 — unlocked context.steps writes during resume: on a resume run StepContext is built with steps=state.step_results (same dict), so the two direct context.steps[...] = ... writes mutated the shared dict outside RunState._lock and could race save(). Both now route through a new _record_result helper that mirrors into context.steps only when it's a distinct object (a fresh run), and otherwise relies solely on record_step_result's locked write — so no unlocked write to the shared dict.

Full workflows suite green (340 passed, including the merged fan-in tests), ruff check clean. The earlier rounds of Copilot feedback are all addressed in the commits above too. CI shows action_required — I believe it needs a maintainer to approve the workflow run on this fork PR. 🙏

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review details

  • Files reviewed: 3/3 changed files
  • Comments generated: 0 new
  • Review effort level: Low

@mnriem mnriem merged commit 20f4306 into github:main Jun 30, 2026
12 checks passed
@mnriem

mnriem commented Jun 30, 2026

Copy link
Copy Markdown
Collaborator

Thank you!

kanfil added a commit to tikalk/agentic-sdlc-spec-kit that referenced this pull request Jun 30, 2026
Upstream commits merged (19):
- Retire iflow/roo/windsurf integrations (github#3166, github#3167, github#3168, github#3211, github#3212, github#3213)
- Move version_satisfies to _utils.py, allow prereleases (github#2695)
- Workflow fan-out max_concurrency via bounded thread pool (github#3224)
- Reject bool max_iterations in while/do-while validation (github#3237)
- bash 3.2 portability: echo→printf, ${word^^}→tr (github#3192)
- --no-persist in common.sh for read-only path resolution (github#3025)
- Reject host-less catalog URLs (github#3209, github#3227)
- Extension updates: Intake v0.1.3, Architecture Workflow v1.2.2,
  Repository Governance, Workflow Preset v1.3.11
- Release 0.12.1 → 0.12.2 → 0.12.3.dev0 (github#3253, github#3259)
- CI Python matrix alignment + bash 3.2 portability (github#3244)
- Docs: Windsurf→Kilo Code references throughout

Conflicts resolved (2):
- pyproject.toml: kept fork name/description, bumped to 0.12.2+adlc1
- AGENTS.md: accepted upstream's condensed agent table (retired agents removed)

Assisted-by: opencode (model: glm-5.2, supervised)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature]: fan-out ignores max_concurrency — items run sequentially

3 participants